package com.example.hadoopdemo.executor.recommend.movie;

import com.example.hadoopdemo.executor.analyze.JobUtil;
import com.example.hadoopdemo.hdfs.HadoopClient;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.springframework.stereotype.Component;

/**
 * 电影推荐任务执行器
 * 算法：基于物品的协同过滤算法
 *
 * @author Ruison
 * @date 2021/12/7
 */
@Component
@AllArgsConstructor
public class RecommendClient {
    /**
     * 默认reduce输出目录
     */
    private static final String OUTPUT_PATH = "/output/recommend";
    private HadoopClient hadoopClient;
    private JobUtil jobUtil;

    /**
     * 开始任务
     *
     * @param jobName   任务名称
     * @param inputPath 输入文件路径
     */
    public void start(String jobName, String inputPath) {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 输出目录 = output/当前Job,如果输出路径存在则删除，保证每次都是最新的
        String outputPath = OUTPUT_PATH + "/" + jobName;
        hadoopClient.rmdir(outputPath, null);
        // 步骤1 - 建立用户对物品的评分矩阵
        String step1OutPath = outputPath + "/step1";
        jobUtil.task(jobName, inputPath, step1OutPath, Step1.ParseDataStep1.class, Step1.GroupByUserStep2.class, IntWritable.class, Text.class);
        // 步骤2 - 建立物品的同现矩阵
        String step2OutPath = outputPath + "/step2";
        jobUtil.task(jobName, step1OutPath, step2OutPath, Step2.ParseDataStep1.class, Step2.CombinationStatisticsStep2.class, Text.class, IntWritable.class);
        // 步骤3 - 合并同现矩阵和评分矩阵
        String step3OutPath1 = outputPath + "/step3_1";
        String step3OutPath2 = outputPath + "/step3_2";
        jobUtil.task(jobName, step1OutPath, step3OutPath1, Step3.UserVectorSplitter.class, null, IntWritable.class, Text.class);
        jobUtil.task(jobName, step2OutPath, step3OutPath2, Step3.CoOccurrenceColumnWrapper.class, null, Text.class, IntWritable.class);
        // 步骤4和5 - 矩阵计算推荐结果
        // 步骤4 - 矩阵相乘
        String step4OutPath = outputPath + "/step4";
        jobUtil.task(jobName, step3OutPath1 + "," + step3OutPath2, step4OutPath, Step4Update.IntegrationData.class, Step4Update.Aggregate.class, Text.class, Text.class);
        // 步骤5 - 矩阵相加
        String step5OutPath = outputPath + "/step5";
        jobUtil.task(jobName, step4OutPath, step5OutPath, Step5.RecommendMapper.class, Step5.RecommendReducer.class, Text.class, Text.class);
    }
}
